Skip to content

Make MonitorJobPoller populate thread context for downstream request interception#2107

Merged
toepkerd merged 11 commits intoopensearch-project:mainfrom
toepkerd:main
Apr 29, 2026
Merged

Make MonitorJobPoller populate thread context for downstream request interception#2107
toepkerd merged 11 commits intoopensearch-project:mainfrom
toepkerd:main

Conversation

@toepkerd
Copy link
Copy Markdown
Collaborator

@toepkerd toepkerd commented Apr 28, 2026

Description

Job poller now populates the thread context with information about the external data source so downstream flows can properly intercept it and make Search and PPL calls to external data source.

Check List

  • New functionality includes testing.
  • New functionality has been documented.
  • API changes companion pull request created.
  • Commits are signed per the DCO using --signoff.
  • Public documentation issue/PR created.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

…asis interception

Signed-off-by: Dennis Toepker <toepkerd@amazon.com>
Signed-off-by: Dennis Toepker <toepkerd@amazon.com>
@toepkerd toepkerd marked this pull request as ready for review April 28, 2026 16:25
@toepkerd toepkerd changed the title Make MonitorJobPoller populate thread context for Oasis interception Make MonitorJobPoller populate thread context for downstream request interception Apr 28, 2026
Signed-off-by: Dennis Toepker <toepkerd@amazon.com>
Comment thread alerting/src/main/kotlin/org/opensearch/alerting/service/MonitorJobPoller.kt Outdated
const val POLLER_THREAD_COUNT = 10
const val POLL_INTERVAL_MS = 1000L

// thread context header keys for request interception
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think we should follow a consistent naming convention for all these headers but needs to be discussed this across team

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These constants are actually existing header names from other code owners that we are reusing. The only new and original const here is IS_BACKGROUND_JOB_HEADER = "alerting-is-background-job"

threadContext.putHeader(IS_BACKGROUND_JOB_HEADER, "true")

// TODO: in long term, may need to generalize to aos data source type
threadContext.putHeader(SERVICE_NAME_HEADER, "aoss")
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this information in the monitor config already? If so we can pull it from there instead of hardcoding

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it's anywhere in the Monitor config, but there is a REMOTE_METADATA_SERVICE_NAME setting I could retrieve the value from. Will explore reading from this value instead of hard-coding aoss.

Copy link
Copy Markdown
Collaborator Author

@toepkerd toepkerd Apr 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If reading from REMOTE_METADATA_SERVICE_NAME approach is used, we must be mindful of conventions for naming services. For collection data sources, I think we can trust the service name will always be aoss.

In the long term, if we support domain data sources, we will need to match existing service name conventions in the interception logic. Whether they favor aos or es as the service name, our logic for populating these remote metadata settings (outside the scope of this PR) will need to be ready to match that convention if reading from settings is to work.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@riysaxen-amzn @MohammedAghil can help ensure that this header's value will match exactly what the header needs or what a setting can hold as a kv map where key will be target.type and value would be header value

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, I misunderstood the monitor.target.type field, thanks for clarifying. I will double check the values that this field can take on (other than the default value of "local"). I will add fail-fast validations for the type.

)
}

if (region.isBlank()) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be a check before poller threads are bootstrapped itself near
val provider = requireNotNull(accountIdProvider) { "accountIdProvider must be set before starting" }
val sqs = requireNotNull(sqsClient) { "sqsClient must be set before starting" }

threadContext.putHeader(IS_BACKGROUND_JOB_HEADER, "true")

// TODO: in long term, may need to generalize to aos data source type
threadContext.putHeader(SERVICE_NAME_HEADER, "aoss")
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

threadContext.putHeader(IS_BACKGROUND_JOB_HEADER, "true")

// TODO: in long term, may need to generalize to aos data source type
threadContext.putHeader(SERVICE_NAME_HEADER, "aoss")
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@riysaxen-amzn @MohammedAghil can help ensure that this header's value will match exactly what the header needs or what a setting can hold as a kv map where key will be target.type and value would be header value

threadContext.putHeader(SERVICE_NAME_HEADER, "aoss")

// external customer data source endpoint, to run search/ppl against
threadContext.putHeader(OPENSEARCH_ENDPOINT_HEADER, monitor.target!!.endpoint)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

plz check that it's not null/empty before you set it.

Signed-off-by: Dennis Toepker <toepkerd@amazon.com>
Signed-off-by: Dennis Toepker <toepkerd@amazon.com>
Signed-off-by: Dennis Toepker <toepkerd@amazon.com>
Signed-off-by: Dennis Toepker <toepkerd@amazon.com>
Signed-off-by: Dennis Toepker <toepkerd@amazon.com>
Signed-off-by: Dennis Toepker <toepkerd@amazon.com>
Signed-off-by: Dennis Toepker <toepkerd@amazon.com>
Setting.Property.NodeScope, Setting.Property.Dynamic
)

val JOB_QUEUE_TARGET_TYPE_TO_SERVICE_NAME = Setting.groupSetting(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will this default to empty map

Copy link
Copy Markdown
Collaborator Author

@toepkerd toepkerd Apr 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.

If no settings with this prefix is configured anywhere, MonitorJobPoller construction in AlertingPlugin.kt effectively gets an empty keySet() (like get(SETTINGS.Empty)). This means every Monitor Job Poller job will fail at thread context writing with "invalid target type" exception. This is intentional behavior: values for these settings must be configured in the following way in opensearch.yml:

plugins.alerting.monitor.target_type_to_service_name.target_type_1: service_name_1
plugins.alerting.monitor.target_type_to_service_name.target_type_2: service_name_2

)

val JOB_QUEUE_TARGET_TYPE_TO_SERVICE_NAME = Setting.groupSetting(
"plugins.alerting.external_scheduler.type_to_service.",
Copy link
Copy Markdown
Member

@eirsep eirsep Apr 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this be renamed to plugins.alerting.monitor.target_type_to_service_name.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Copy Markdown
Member

@eirsep eirsep left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor comments but rest LGTM

Signed-off-by: Dennis Toepker <toepkerd@amazon.com>
@toepkerd toepkerd merged commit 642b702 into opensearch-project:main Apr 29, 2026
21 of 23 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants